package main

import (
	"context"
	"dash/cmd/data/oldmessage"
	"dash/messages"
	"database/sql"
	"encoding/json"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/olivere/elastic/v7"
	"math/rand"
	"time"
)

type ChatroomMember struct {
	ChatroomId string
	memberId   string
	alias      string
	quit       bool
	joinMethod string
	inviterId  string
}

func newTopic(id, pid, name string, isDfault bool, keywords []string) *messages.Topic {
	return &messages.Topic{
		Id:           id,
		ParentId:     pid,
		IsDefault:    isDfault,
		KeywordScore: rand.Float64() * 5,
		Keywords:     keywords,
		Name:         name,
		Score:        rand.Float64() * 5,
	}
}

func NewTopicOBJ(keyword []string) map[string]*messages.Topic {
	topicMap := map[string]*messages.Topic{}
	topicMap["001"] = newTopic("001", "", "民生", true, keyword)
	topicMap["002"] = newTopic("002", "", "警务", true, keyword)
	topicMap["003"] = newTopic("003", "001", "社情民意", true, keyword)
	topicMap["004"] = newTopic("004", "001", "民生", true, keyword)
	topicMap["005"] = newTopic("005", "001", "肺炎", false, keyword)
	topicMap["006"] = newTopic("006", "002", "就业", false, keyword)
	topicMap["007"] = newTopic("007", "002", "养老", false, keyword)
	topicMap["008"] = newTopic("008", "002", "保险", true, keyword)
	return topicMap
}

func randomTopic(randmNum int, topics []messages.Topic, keywords []string) []messages.Topic {
	if randmNum == rand.Intn(3) {
		return topics
	}
	topicObjMap := NewTopicOBJ(keywords)

	i := 1
	for _, obj := range topicObjMap {
		topics = append(topics, *obj)
		if i > rand.Intn(3) {
			return topics
		}
		i++
	}
	return topics
}

func Old2new(message oldmessage.Message) messages.Message {
	typeStr := "text"
	dataSource := "微信"
	topicIds := []string{"001", "002", "003", "004", "005", "006", "007", "008", "009"}
	topics := make([]messages.Topic, len(message.Topics))
	now := time.Now()
	monthAgo := now.Add(-30*24*time.Hour).UnixNano() / 1e6
	for i, topic := range message.Topics {
		topics[i] = messages.Topic{
			Id:           topic.Name,
			KeywordScore: rand.Float64() * 5,
			Keywords:     message.TopicKeywords,
			Name:         topic.Name,
			Score:        rand.Float64() * 5,
			ParentId:     topicIds[rand.Intn(len(topicIds))],
		}
	}
	suijicishu := rand.Intn(3)
	topics = randomTopic(suijicishu, topics, message.TopicKeywords)

	ts := rand.Int63()%30*24*3600000 + monthAgo
	newMessage := messages.Message{
		Id:               message.Id,
		Type:             typeStr,
		Content:          message.MessageDetail.Text,
		Timestamp:        ts,
		CaptureTimestamp: message.MessageDetail.CaptureTime,
		DataSource:       dataSource,
		Topics:           topics,
		Member: messages.Member{
			Id:       message.Member.Id,
			NickName: message.Member.Nickname,
			Sex:      message.Member.Sex,
		},
		Chatroom: messages.Chatroom{
			Id:   message.Chatroom.Id,
			Name: message.Chatroom.Name,
		},
	}
	newMessage.WarningScore = 0
	for _, topic := range newMessage.Topics {
		if topic.IsDefault == true {
			newMessage.WarningScore += rand.Float64()
		}
	}
	return newMessage
}

func SetMessageMemberChatroom(allMembers map[string]*messages.Member, allMessages []*messages.Message, allChatrooms map[string]*messages.Chatroom) []*messages.Message {
	for _, msg := range allMessages {
		member, exists := allMembers[msg.Member.Id]
		if !exists {
			fmt.Println("该条消息上的member不在member索引中,已忽略该问题")
		}
		msg.Member = *member
		chatroom, exists := allChatrooms[msg.Chatroom.Id]
		if !exists {
			fmt.Println("该条消息上的chatroom不在chatroom索引中,已跳过")
			continue
		}
		msg.Chatroom = *chatroom
	}
	return allMessages
}

func main() {
	var chatroomMembers []ChatroomMember
	joinMethod := []string{"扫码", "邀请"}
	ctx := context.Background()
	oldEsClient, err := elastic.NewSimpleClient(elastic.SetURL("http://192.168.31.179:39200"))
	if err != nil {
		panic(err)
	}
	fmt.Println("old es client")
	newEsClient, err := elastic.NewSimpleClient(elastic.SetURL("http://192.168.31.36:19200"))
	if err != nil {
		panic(err)
	}
	_ = newEsClient
	fmt.Println("new es client")
	mysqlDB, err := sql.Open("mysql", "root:nil@tcp(192.168.31.36:13306)/dash")
	if err != nil {
		panic(err)
	}
	fmt.Println("mysql db")
	defer func() {
		_ = mysqlDB.Close()
	}()
	searchResult, err := oldEsClient.
		Search("message").
		Size(10000).
		Sort("message.timestamp", true).
		Query(elastic.NewBoolQuery().Must(elastic.NewExistsQuery("message.text"))).
		Do(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(searchResult.TotalHits())
	oldMessages := make([]*oldmessage.Message, 0, 10000)
	oldMembers := map[string]*oldmessage.Member{}
	oldChatrooms := map[string]*oldmessage.Chatroom{}
	var newMessages []*messages.Message
	newMembers := map[string]*messages.Member{}
	newChatrooms := map[string]*messages.Chatroom{}
	for _, hit := range searchResult.Hits.Hits {
		var message oldmessage.Message
		if err := json.Unmarshal(hit.Source, &message); err != nil {
			panic(err)
		}
		oldChatrooms[message.Chatroom.Id] = &message.Chatroom
		oldMembers[message.Member.Id] = &message.Member
		oldMessages = append(oldMessages, &message)
		newMessage := Old2new(message)
		newMessages = append(newMessages, &newMessage)
	}
	chatroomIds := make([]string, 0, len(oldChatrooms))
	for id := range oldChatrooms {
		chatroomIds = append(chatroomIds, id)
	}
	searchResult, err = oldEsClient.
		Search("chatroom").
		Query(elastic.NewIdsQuery().Ids(chatroomIds...)).
		Size(len(chatroomIds)).
		Do(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println(len(searchResult.Hits.Hits), len(chatroomIds))
	//if len(searchResult.Hits.Hits) != len(chatroomIds) {
	//	panic("群数量不匹配")
	//}
	for _, hit := range searchResult.Hits.Hits {
		var chatroom oldmessage.Chatroom
		if err := json.Unmarshal(hit.Source, &chatroom); err != nil {
			panic(err)
		}
		oldChatrooms[chatroom.Id] = &chatroom

		if _, exists := oldMembers[chatroom.OwnerId]; !exists {
			oldMembers[chatroom.OwnerId] = &oldmessage.Member{
				Id:       chatroom.OwnerId,
				Nickname: chatroom.OwnerNickname,
			}
			chatroomMember := ChatroomMember{
				ChatroomId: chatroom.OwnerId,
				memberId:   chatroom.OwnerId,
				alias:      chatroom.OwnerNickname,
				quit:       false,
				joinMethod: "群主",
				inviterId:  "",
			}
			chatroomMembers = append(chatroomMembers, chatroomMember)
		}
	}
	memberIds := make([]string, 0, len(oldMembers))
	for _, member := range oldMembers {
		memberIds = append(memberIds, member.Id)
	}

	searchResult, err = oldEsClient.
		Search("member").
		Query(elastic.NewIdsQuery().Ids(memberIds...)).
		Size(len(memberIds)).
		Do(ctx)
	if err != nil {
		panic(err)
	}
	fmt.Println("oldMembers:", len(searchResult.Hits.Hits), len(memberIds))
	for _, hit := range searchResult.Hits.Hits {
		var member oldmessage.Member
		if err := json.Unmarshal(hit.Source, &member); err != nil {
			panic(err)
		}
		oldMembers[member.Id] = &member
	}
	now := time.Now()
	monthAgo := now.Add(-30*24*time.Hour).UnixNano() / 1e6
	i := 1
	collectorIds := []string{"001", "002"}
	for _, member := range oldMembers {
		num := rand.Intn(2)
		ts := rand.Int63()%30*24*3600000 + monthAgo
		newMembers[member.Id] = &messages.Member{
			Id:               member.Id,
			NickName:         member.Nickname,
			Age:              20,
			Sex:              member.Sex,
			Avatar:           fmt.Sprintf("/avatar/%d.jpg", i),
			CaptureTimestamp: ts,
			UsedNickname:     []string{member.Nickname},
			DataSource:       "微信",
			Address:          []string{"北京", "朝阳"},
			Watched:          member.Watched,
			Tags:             member.Tags,
			Signature:        member.Signature,
			CollectorId:      collectorIds[num],
			Email:            fmt.Sprintf("%s/@163.com", i),
			UsedSignature:    []string{member.Signature},
		}
		//member.Avatar = fmt.Sprintf("/avatar/%d.jpg", i)
		//member.Sex = "男"
		i++
	}
	//读取mysql category
	rows, err := mysqlDB.Query(`SELECT id FROM category WHERE not parentId="";`)
	if err != nil {
		fmt.Println("get category error", err)
	}
	var categoryIds []string
	for rows.Next() {
		var categoryId string
		err := rows.Scan(&categoryId)
		if err != nil {
			fmt.Println("category error", err)
		}
		categoryIds = append(categoryIds, categoryId)
	}
	for _, chatroom := range oldChatrooms {
		ts := rand.Int63()%30*24*3600000 + monthAgo
		var num int
		var catId string
		if len(categoryIds) > 0 {
			num = rand.Intn(len(categoryIds))
			catId = categoryIds[num]
		}
		num2 := rand.Intn(2)
		c := messages.Chatroom{
			Id:               chatroom.Id,
			Name:             chatroom.Name,
			Owner:            messages.Member{Id: chatroom.OwnerId, NickName: chatroom.OwnerNickname},
			Avatar:           fmt.Sprintf("/avatar/%d.jpg", i),
			CollectorId:      []string{collectorIds[num2]},
			CaptureTimestamp: ts,
			DataSource:       "微信",
			Watched:          false,
			Tags:             chatroom.Tags,
			Comment:          chatroom.Comment,
			UsedName:         []string{chatroom.Name},
			Category:         catId,
		}
		newChatrooms[chatroom.Id] = &c
		i++
	}

	//修改message中member,chatroom
	chatroomMemberRelation := make(map[string]ChatroomMember)
	allMembers := make(map[string][]string)
	for _, newMsg := range newMessages {
		//set message
		allMembers[newMsg.Chatroom.Id] = append(allMembers[newMsg.Chatroom.Id], newMsg.Member.Id)
		num := rand.Intn(2)
		relation := fmt.Sprint("s%s%", newMsg.Chatroom.Id, newMsg.Member.Id)
		if _, exists := chatroomMemberRelation[relation]; exists {
			continue
		}
		ts := rand.Int63()%30*24*3600000 + monthAgo
		newMsg.Timestamp = ts
		newMsg.CaptureTimestamp = ts
		newMsg.Type = "text"
		newMsg.Watched = false
		newMsg.CollectorId = collectorIds[rand.Intn(2)]
		newMsg.Watched = true

		//get 人群关系;设置邀请人
		chatroomMem := ChatroomMember{
			ChatroomId: newMsg.Chatroom.Id,
			memberId:   newMsg.Member.Id,
			alias:      newMsg.Member.NickName,
			quit:       false,
			joinMethod: joinMethod[num],
		}
		if num == 1 {
			suijishu := rand.Intn(len(allMembers[newMsg.Chatroom.Id]))
			chatroomMem.inviterId = allMembers[newMsg.Chatroom.Id][suijishu]
		}
		chatroomMemberRelation[relation] = chatroomMem
		chatroomMembers = append(chatroomMembers, chatroomMem)
	}

	newMessages = SetMessageMemberChatroom(newMembers, newMessages, newChatrooms)
	memberStmt, err := mysqlDB.Prepare(`INSERT INTO member 
    (id,
     nickname,
     age,
     sex,
     avatar,
     captureTimestamp,
     usedNickname,
     dataSource,
     address,
     watched,
     tags,
     signature,
     usedSignature)
     values(?,?,?,?,?,?,?,?,?,?,?,?,?);`)
	if err != nil {
		fmt.Printf("member err")
		panic(err)
	}
	defer func() {
		_ = memberStmt.Close()
	}()
	chatroomStmt, err := mysqlDB.Prepare(`INSERT INTO 
    chatroom (id,name,dataSource,avatar,ownerId,
              usedName,watched,captureTimestamp,tags,comment,categoryId)VALUES(?,?,?,?,?,?,?,?,?,?,?); `)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = chatroomStmt.Close()
	}()
	chatroom_memberstmt, err := mysqlDB.Prepare(`
		INSERT INTO chatroom_member (chatroomId,memberId,alias,quit,joinMethod,inviterId)values(?,?,?,?,?,?) ;`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = chatroom_memberstmt.Close()
	}()
	collectorMemberStmt, err := mysqlDB.Prepare(`
		INSERT INTO collector_member 
		    (collectorId,memberId)
		VALUES (?,?)`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = collectorMemberStmt.Close()
	}()
	collectorChatroomStmt, err := mysqlDB.Prepare(`
		INSERT INTO collector_chatroom 
		    (collectorId,chatroomId)
		VALUES (?,?)`)
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = collectorMemberStmt.Close()
	}()

	//save member//collector_member
	for _, m := range newMembers {
		nicknames := convertJson(m.UsedNickname)
		signatures := convertJson(m.UsedSignature)
		addresses := convertJson(m.Address)
		tags := convertJson(m.Tags)
		_, err := memberStmt.Exec(m.Id, m.NickName, m.Age, m.Sex, m.Avatar,
			m.CaptureTimestamp, nicknames, m.DataSource, addresses, m.Watched, tags, m.Signature, signatures)
		if err != nil {
			fmt.Println("member error:", err)
			continue
		}
		_, err = collectorMemberStmt.Exec(m.CollectorId, m.Id)
		if err != nil {
			fmt.Println("member coll error:", err)
			continue
		}
	}
	fmt.Println("insert member")

	fmt.Println("insert collector member")
	//save chatrooms//collector_chatroom
	for _, c := range newChatrooms {
		names := convertJson(c.UsedName)
		tags := convertJson(c.Tags)
		_, err = chatroomStmt.Exec(c.Id, c.Name, c.DataSource, c.Avatar, c.Owner.Id, names,
			c.Watched, c.CaptureTimestamp, tags, c.Comment, c.Category)
		if err != nil {
			fmt.Println("chatroom error:", err)
			continue
		}
		for _, cid := range c.CollectorId {
			_, err = collectorChatroomStmt.Exec(cid, c.Id)
			if err != nil {
				fmt.Println("collector chatroom error:", err)
				continue
			}
		}
	}

	fmt.Println("insert chatrom")
	fmt.Println("insert collector chatrom")
	//save chatroom_member
	for _, cm := range chatroomMembers {
		_, err := chatroom_memberstmt.Exec(cm.ChatroomId, cm.memberId,
			cm.alias, cm.quit, cm.joinMethod, cm.inviterId)
		if err != nil {
			fmt.Println("chatroom member error:", err)
			continue
		}
	}
	fmt.Println("insert collector member")

	for _, msg := range newMessages {
		if member, exists := newMembers[msg.Member.Id]; exists {
			msg.Member.Avatar = member.Avatar
		}
	}
	//save es
	var requests []elastic.BulkableRequest
	for _, msg := range newMessages {
		requests = append(requests, elastic.NewBulkIndexRequest().Index("message").Id(msg.Id).Doc(msg))
		if len(msg.Topics) > 0 {
			requests = append(requests, elastic.NewBulkIndexRequest().Index("alarm_message").Id(msg.Id).Doc(msg))
		}
	}
	_, err = newEsClient.Bulk().Add(requests...).Do(ctx)
	if err != nil {
		panic(err)
	}
}

func convertJson(str interface{}) []byte {
	jsonStr, err := json.Marshal(str)
	if err != nil {
		fmt.Printf("json marshal error:s%", err)
	}
	return jsonStr
}
